-
Notifications
You must be signed in to change notification settings - Fork 23
[Maulik] batch commits from offset commit worker #45
Conversation
fb2a52f
to
5c765c0
Compare
int offsetClubbedBatches = 0; | ||
while (true) { | ||
Records commitOffset = commitQueue.poll(queueConfig.getTimeout(), queueConfig.getTimeoutUnit()); | ||
if (stopped || clock.currentEpochMillis() - start > offsetState.getOffsetCommitTime()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like getOffsetCommitTime
is more of a timeout then a commit time, if I am correct, can we name this accordingly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OFFSET_COMMIT_TIME is the time till which we will accumulate the batches and then commit. It is not a timeout, it's a time after which we will commit the offsets to kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, got it. Then I think we should name it something like getOffsetBatch(Time/Duration)? This is the time the batching process will take until the worker flushes it to Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset commit time looked more appropriate, that is the time after which we commit offsets to kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the configuration is changed to offsetBatchDuration
if (partitionsCommitOffset.size() != 0) { | ||
kafkaCommitter.commitSync(partitionsCommitOffset); | ||
} | ||
log.info("committed offsets partition {} size {}", partitionsCommitOffset.toString(), partitionsCommitOffset.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this log really helpful? Can we mark this either debug or remove serializing the whole map to a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we will be committing after batching commits, we want to log when the offsets were committed, there is no other info log in the OffsetCommitter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mauliksoneji Yeah, that's fine, I am just wondering what's the use of seeing offset values per partition in the log, we can just log the size of that map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't have much information, the maximum number of keys would be the topic-partition subscribed by the pod, and there is only max offset in the value, this is not that much information.
5c765c0
to
f24e943
Compare
Codecov Report
@@ Coverage Diff @@
## master #45 +/- ##
============================================
+ Coverage 81.73% 82.56% +0.82%
+ Complexity 257 255 -2
============================================
Files 52 52
Lines 805 820 +15
Branches 73 71 -2
============================================
+ Hits 658 677 +19
+ Misses 126 120 -6
- Partials 21 23 +2
Continue to review full report at Codecov.
|
Signed-off-by: mauliksoneji <[email protected]>
f24e943
to
19c0b8a
Compare
LGTM |
Address Issue #44 by introducing batch commits in OffsetCommitWorker